package com.flink.examples.mysql;

import com.flink.examples.TUser;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Description
 * @Author JL
 * @Date 2020/09/19
 * @Version V1.0
 */
public class DataSourceToSink {

    public static void main(String[] args) throws Exception {
        //无界数据流
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(env);

        ExecutionEnvironment dEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(dEnv);

        //1.添加输入的数据源
        DataStreamSource<TUser> fromDataStream = env.addSource(new JdbcReader());
        //2.统计project打包的次数
        DataStream<Tuple2<String, Integer>> toDataStream = fromDataStream
                .flatMap(new FlatMapFunction<TUser, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(TUser user, Collector<Tuple2<String, Integer>> out) throws Exception {
                        out.collect(Tuple2.of(String.valueOf(user.getSex()), 1));
                    }
                });
//                .keyBy((KeySelector<Tuple2<String, Integer>, String>) k -> k.f0)
//                .countWindow(5)
//                .sum(1);

//        toDataStream.print();



//        Table table = sTableEnv.fromDataStream(toDataStream, $("f0"), $("f1"));
        sTableEnv.createTemporaryView("temp_table", toDataStream,  $("f0"), $("f1"));
        Table table = sTableEnv.sqlQuery("SELECT f0,f1 as f1 FROM temp_table");
        TableResult result = table.execute();
        result.print();

//        TypeInformation<Tuple2<String, Integer>> info = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){});
//        DataSet<Tuple2<String, Integer>> dataSet = tEnv.toDataSet(table, info);
//        dataSet.print();

        //3.将统计结果输出到记录表
        //toDataStream.addSink(new JdbcWriter());
        env.execute("flink mysql to mysql");
    }

}
